package com.asinking.app;



import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.asinking.debezium.DebeziumObject;
import com.asinking.debezium.MyDebeziumDeserializationSchema;
import com.asinking.sink.Sink2Ck;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class AppTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DebeziumSourceFunction<DebeziumObject> mysqlSource = MySQLSource.<DebeziumObject>builder()
                .hostname("159.75.9.84")
                .port(3306)
                .tableList("dev.iceberg")
                .username("root")
                .password("123456")
                .deserializer(new MyDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();



        DataStreamSource<DebeziumObject> mysqlStream = env.addSource(mysqlSource);

        mysqlStream
                .addSink(new Sink2Ck());


        env.execute();
    }
}
